Apache Flink-এর Table API এবং SQL হলো উচ্চ-স্তরের APIs যা ডেটা প্রসেসিংকে সহজ এবং এক্সপ্রেসিভ করে তোলে। এগুলো ব্যবহার করে আমরা স্ট্রিম এবং ব্যাচ ডেটা খুব সহজেই প্রক্রিয়াকরণ করতে পারি, যেখানে Table API জাভা বা স্কালা API হিসেবে কাজ করে এবং SQL পরিচিত SQL সিনট্যাক্স ব্যবহার করে ডেটা প্রক্রিয়াকরণ করে।
Table API একটি রিচ, রিলেশনাল API যা ডেটা প্রসেসিং ও ট্রান্সফরমেশন করতে ট্যাবুলার ডেটা (টেবিল বা ভিউ) ব্যবহার করে। এটি স্ট্রিম এবং ব্যাচ উভয় ডেটার জন্য কাজ করে এবং এতে SQL-এর মতো অপারেশন যেমন select
, filter
, join
, groupBy
ইত্যাদি সাপোর্ট করে।
// Flink Execution Environment এবং Table Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// সোর্স ডেটা তৈরি করা (ডেমো ডেটাসেট হিসেবে)
DataStream<Tuple2<Integer, String>> dataStream = env.fromElements(
new Tuple2<>(1, "Alice"),
new Tuple2<>(2, "Bob"),
new Tuple2<>(3, "Charlie")
);
// DataStream থেকে টেবিল তৈরি করা
Table table = tableEnv.fromDataStream(dataStream, $("id"), $("name"));
// টেবিল থেকে ডেটা প্রসেসিং (ফিল্টার অপারেশন)
Table filteredTable = table.filter($("id").isGreater(1));
// প্রসেস করা টেবিলকে DataStream এ কনভার্ট করা
DataStream<Tuple2<Integer, String>> resultStream = tableEnv.toDataStream(filteredTable);
বর্ণনা: এখানে, একটি DataStream
থেকে একটি টেবিল তৈরি করা হয়েছে এবং তারপর একটি ফিল্টার অপারেশন প্রয়োগ করা হয়েছে যেখানে id
মান ১-এর বেশি। প্রসেস করার পর, এটি আবার DataStream
এ কনভার্ট করা হয়েছে।
SQL API Flink-এর ট্যাবুলার API-এর একটি অংশ যা স্ট্যান্ডার্ড SQL সিনট্যাক্স ব্যবহার করে স্ট্রিম এবং ব্যাচ ডেটা প্রসেস করতে দেয়। SQL API ব্যবহার করে ডেটা প্রসেসিং করার জন্য আপনাকে Table Environment তৈরি করতে হয় এবং টেবিল বা ভিউ রেজিস্টার করতে হয়।
// Flink Execution Environment এবং Table Environment তৈরি করা
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// সোর্স ডেটা তৈরি করা
DataStream<Tuple2<Integer, String>> dataStream = env.fromElements(
new Tuple2<>(1, "Alice"),
new Tuple2<>(2, "Bob"),
new Tuple2<>(3, "Charlie")
);
// টেবিল রেজিস্টার করা
tableEnv.createTemporaryView("people", tableEnv.fromDataStream(dataStream, $("id"), $("name")));
// SQL Query প্রয়োগ করা
Table result = tableEnv.sqlQuery("SELECT * FROM people WHERE id > 1");
// টেবিলকে DataStream এ কনভার্ট করা
DataStream<Tuple2<Integer, String>> resultStream = tableEnv.toDataStream(result);
বর্ণনা: এখানে, একটি টেবিল তৈরি করা হয়েছে এবং people
নামে একটি টেম্পোরারি ভিউ হিসেবে রেজিস্টার করা হয়েছে। তারপর একটি SQL SELECT
কোয়েরি প্রয়োগ করা হয়েছে যেখানে id
মান ১-এর বেশি।
Flink Table API এবং SQL API একত্রে ব্যবহার করা সম্ভব, যা ডেটা প্রসেসিং আরও ফ্লেক্সিবল করে তোলে। আপনি Table API এর মাধ্যমে টেবিল তৈরি ও প্রসেস করতে পারেন এবং SQL কোয়েরি ব্যবহার করে আরও জটিল অপারেশন করতে পারেন।
Table API এবং SQL API উভয়েই বিভিন্ন সোর্স ও সিংকের সাথে ইন্টিগ্রেট করা যায় যেমন Kafka, HBase, এবং RDBMS। উদাহরণস্বরূপ, আপনি Kafka থেকে ডেটা ইনজেস্ট করতে এবং প্রক্রিয়াকৃত ডেটা অন্য কোনো সিস্টেমে পাঠাতে পারেন।
// Kafka সোর্স এবং সিংক তৈরি করা
String kafkaDDL = "CREATE TABLE kafka_table (" +
" id INT," +
" name STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'input_topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")";
tableEnv.executeSql(kafkaDDL);
// SQL কোয়েরি প্রয়োগ করা এবং আউটপুট রেজিস্টার করা
Table result = tableEnv.sqlQuery("SELECT id, name FROM kafka_table WHERE id > 1");
tableEnv.executeSql("CREATE TABLE output_table (" +
" id INT," +
" name STRING" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'output_topic'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'format' = 'json'" +
")");
// টেবিলের ডেটা সিংকে লিখে দেওয়া
result.executeInsert("output_table");
বর্ণনা: এখানে, kafka_table
নামে একটি Kafka সোর্স রেজিস্টার করা হয়েছে এবং একটি SQL কোয়েরি প্রয়োগ করে প্রক্রিয়াকৃত ডেটা output_table
নামক Kafka সিংকে পাঠানো হয়েছে।
Apache Flink-এর Table API এবং SQL API ডেটা প্রসেসিংকে আরও সহজ এবং কার্যকর করে তোলে। এই API-গুলো স্ট্রিম এবং ব্যাচ ডেটার জন্য এক্সপ্রেসিভ, ফ্লেক্সিবল এবং পারফরম্যান্স-অপটিমাইজড সমাধান প্রদান করে। Flink-এর মাধ্যমে সহজেই বড় আকারের এবং জটিল ডেটা প্রসেসিং কাজগুলো সম্পন্ন করা সম্ভব।
আরও দেখুন...